iT邦幫忙

2023 iThome 鐵人賽

DAY 2
0
AI & Data

30天胡搞瞎搞學會pyspark系列 第 2

[ Day 2 ] - Pyspark | 介紹 - DataFrame篇 - Create DataFrame

  • 分享至 

  • xImage
  •  

啊,忽然不知不覺就是第二天挑戰了,但是腦子還是一片混亂, 所以我們還是就來簡單説説作為一個資料工程師,在胡搞瞎搞馬上要入手時,第一個該開始的地方Create
(至少我是從這裡開始的)

開始囉!

身為一個Data Engineer, 在日常的工作中, 主要的工作內容就是針對現有的資料進行清理(Data Cleaning), 協助分析師抑或是資料科學家進行後續的資料加值,
那在做資料清理的時候,最重要也是第一步會是什麼呢?
當然就是,你要先取的資料,並且讀入你的工具中

這邊簡單介紹一下主要有5種方法可以來Create 你的DataFrame,幫助你做後續的清理工作!


  1. 使用RDD來Create
  2. 利用Python的DataFrame來Create
  3. 使用List 來Create
  4. 利用CSV/Json來做Create
  5. 使用DB Data來做Create

1.使用RDD來Create

RDD = Resilient Distributed Datasets (彈性分散式資料集), 顧名思義它可以分散成多個分區,每一個分區就是你的dataset 的片段, 也就是這樣的架構,可以達到分散式運算的效果。
所以我們將資料藉由RDD載入DataFrame中, 在後續的運算上就可以利用分散式架構達到快速運算的目的
rdd.toDF = 將data / column Header(ColumnName)進行mapping
df.show() = show出你的DataFrame的長相,通常會像是表格(其中也有一些有趣的parameter可以玩,這個後續再介紹)
df.printSchema() = 將你的DataType 顯示出來

情境:
對我來說, 因為這樣的資料需要手動輸入, 所以通常都是拿來做範例的時候使用(就是像現在啦)
下面範例:

rdd = sc.parallelize([("Carmen", 23, "吉普賽女郎"), ("Don José", 25, "青年衛兵下士"), ("Escamillo", 30, "鬥牛士")])
df = rdd.toDF(["name", "age", "Role"])
df.show()
df.printSchema()
'''
+---------+---+------------+OUTPUT+---------+---+------------+
+---------+---+------------+
|     name|age|        Role|
+---------+---+------------+
|   Carmen| 23|  吉普賽女郎|
| Don José| 25|青年衛兵下士|
|Escamillo| 30|      鬥牛士|
+---------+---+------------+

root
 |-- name: string (nullable = true)
 |-- age: long (nullable = true)
 |-- Role: string (nullable = true)
+---------+---+------------+OUTPUT+---------+---+------------+
'''

https://ithelp.ithome.com.tw/upload/images/20230917/2016302951Ul3LOO6A.png

2. 利用Python的DataFrame來Create

在使用這個方法的時候要注意, 需要import pandas as pd的library 去create DataFrame 然後再轉成Spark的DataFrame!

這時候可能會有人好奇想問,既然可以直接利用pandas 這麼主流的library去做資料處理及分析, 那為什麼還需要Spark呢?
當然在處理資料量較少的狀況可以用Pandas去做Data Cleaning, 但是當你遇到TB等級的資料時, pandas 因為缺乏分散式運算的能力, 相對的也就無法快速地處理現有的資料。

情境:
所以當你想要把pandas 的DataFrame轉換成更有效率的Spark DataFrame做Data Cleaning時
下面範例:

import pandas as pd
df = pd.DataFrame([["Carmen", 23, "吉普賽女郎"], ["Don José", 25, "青年衛兵下士"], ["Escamillo", 30, "鬥牛士"]],
                  columns=["name", "age", "Role"])
print(">> DataFrame:")
print(df)
print("\n")
Spark_df = spark.createDataFrame(df)
print(">> SparkDataFrame:")
Spark_df.show()
'''
+---------+---+------------+OUTPUT+---------+---+------------+
>> DataFrame:
        name  age    Role
0     Carmen   23   吉普賽女郎
1   Don José   25  青年衛兵下士
2  Escamillo   30     鬥牛士


>> SparkDataFrame:
+---------+---+------------+
|     name|age|        Role|
+---------+---+------------+
|   Carmen| 23|  吉普賽女郎|
| Don José| 25|青年衛兵下士|
|Escamillo| 30|      鬥牛士|
+---------+---+------------+
+---------+---+------------+OUTPUT+---------+---+------------+
'''

https://ithelp.ithome.com.tw/upload/images/20230917/201630298BfcVHWF31.png

3. 使用List 來Create

恩 對 就是這樣

情境:
對我來說, 因為這樣的資料需要手動輸入, 所以通常都是拿來做範例的時候使用(就是像現在啦)
下面範例:

list_values = [["Carmen", 23, "吉普賽女郎"], ["Don José", 25, "青年衛兵下士"], ["Escamillo", 30, "鬥牛士"]]
Spark_df = spark.createDataFrame(list_values, ["name", "age", "Role"])
Spark_df.show()
'''
+---------+---+------------+OUTPUT+---------+---+------------+
+---------+---+------------+
|     name|age|        Role|
+---------+---+------------+
|   Carmen| 23|  吉普賽女郎|
| Don José| 25|青年衛兵下士|
|Escamillo| 30|      鬥牛士|
+---------+---+------------+
+---------+---+------------+OUTPUT+---------+---+------------+
'''

4. 利用CSV/Json來做Create

終於來到重頭戲之一, 也就是直接讀CSV/Json 啦

4.1 Read CSV

在讀取CSV資料的時候要非常小心他的delimiter,正確的設定delimiter 就可以讀到被切的很乾淨的資料, 才不會發生跑版(?)的問題
同場加映:read.option
CSV:
https://ithelp.ithome.com.tw/upload/images/20230917/201630296TcOjCXFn5.png

情境:
大部分的情境中,讀取CSV來做初步的Data Cleaning 可能是最常見的一種CASE, 總而言之很好用

df = spark.read.option("header", "true")\
    .option("inferSchema", "true")\
    .option("delimiter", ",")\
    .csv("/home/glue_user/workspace/jupyter_workspace/Carmen_musical.csv")
df.show(5)
df.printSchema()
'''
+---------+---+------------+OUTPUT+---------+---+------------+
+---------+----+-------------+
|     name| age|         Role|
+---------+----+-------------+
|   Carmen|23.0|   吉普賽女郎|
| Don José|25.0| 青年衛兵下士|
|Escamillo|30.0|       鬥牛士|
+---------+----+-------------+

root
 |-- name: string (nullable = true)
 |--  age: double (nullable = true)
 |--  Role: string (nullable = true)
+---------+---+------------+OUTPUT+---------+---+------------+
'''

https://ithelp.ithome.com.tw/upload/images/20230917/20163029INBUvF8TUI.png

4.2 Read Json

需要特別注意的是,使用超過2.3以上的spark版本才可以使用以下語法!
json:

[
  {
    "name": "Carmen",
    "age": 23,
    "Role": "吉普賽女郎"
  },
  {
    "name": "Don José",
    "age": 25,
    "Role": "青年衛兵下士"
  },
  {
    "name": "Escamillo",
    "age": 30,
    "Role": "鬥牛士"
  }
]

情境:
讀取json來做初步的Data Cleaning 也是另一個很方便的case,也很好用

df = spark.read.option("multiline","true").json("/home/glue_user/workspace/jupyter_workspace/Carmen_musical.json")

df.show(5)
df.printSchema()
'''
+---------+---+------------+OUTPUT+---------+---+------------+
+------------+---+---------+
|        Role|age|     name|
+------------+---+---------+
|  吉普賽女郎| 23|   Carmen|
|青年衛兵下士| 25| Don José|
|      鬥牛士| 30|Escamillo|
+------------+---+---------+

root
 |-- Role: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
+---------+---+------------+OUTPUT+---------+---+------------+
'''

5.使用DB Data來做Create

各個DB幾乎都可以連,只要你提供正確的連線資訊就可以囉!

情境:
清理大量資料,去資料出來做分析或是建資料管道(Data Pipeline)的時候很好用喔~

## 使用mysql jdbc作為連線讀取DB中的data
url = "jdbc:mysql://localhost:3306/test"
df = spark.read.format("jdbc") \
 .option("url", url) \
 .option("dbtable", "runoob_tbl") \
 .option("user", "root") \
 .option("password", "8888") \
 .load()\
df.show()
df.printSchema()
'''
+---------+---+------------+OUTPUT+---------+---+------------+
+------------+---+---------+
|        Role|age|     name|
+------------+---+---------+
|  吉普賽女郎| 23|   Carmen|
|青年衛兵下士| 25| Don José|
|      鬥牛士| 30|Escamillo|
+------------+---+---------+

root
 |-- Role: string (nullable = true)
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
+---------+---+------------+OUTPUT+---------+---+------------+
'''

如果有任何不理解、錯誤或其他方法想分享的話,歡迎留言給我!喜歡的話,也歡迎按讚訂閱!

我是 Vivi,一位在雲端掙扎的資料工程師!我們下一篇文章見!Bye Bye~
【本篇文章將同步更新於個人的 Medium,期待與您的相遇!】


上一篇
[ Day 1 ] - Pyspark | 介紹 - 什麼是Pyspark?
下一篇
[ Day 3 ] - Pyspark | 介紹 - DataFrame篇 - Data Overview
系列文
30天胡搞瞎搞學會pyspark30
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言